{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "dc557e84",
   "metadata": {},
   "source": [
    "# Build a Regular RAG Document Ingestion Pipeline  (No Ray required)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8653cde5",
   "metadata": {},
   "source": [
    "\n",
    "## Introduction\n",
    "\n",
    "In this tutorial, we will build a document processing pipeline that accomplishes the following:\n",
    "\n",
    "- Load various document formats (PDF, DOCX, PPTX, HTML, TXT) using Unstructured IO.\n",
    "- Apply text chunking strategy (fixed and recursive) to  break down the text for further processing.\n",
    "- Embedd the text chunk using a embedding model.\n",
    "- Store the embeddings to Chroma DB for further document retrieval capabilities.\n",
    "\n",
    "Here is the diagram of the architecture:\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/ray-project/ray/refs/heads/master/doc/source/ray-overview/examples/e2e-rag/images/rag-data-ingestion.png\" width=800>\n",
    "\n",
    "This pipeline is designed to prepare your data for downstream tasks while ensuring flexibility and effectiveness in handling diverse document types.\n",
    "\n",
    "There is no RAY need for this tutorial—just familiarity with building RAG and unstructured document ingestion. **You can skip this tutorial if you are already familiar with RAG document ingesstion process.**\n",
    "\n",
    "In Notebook #2, we will show more details on how to use Ray to scale this process for massive document processing.\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2ef47d18",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert-warning\">\n",
    "  <b>Anyscale-Specific Configuration</b>\n",
    "  \n",
    "  <p>Note: This tutorial is optimized for the Anyscale platform. When running on open source Ray, additional configuration is required. For example, you’ll need to manually:</p>\n",
    "  \n",
    "  <ul>\n",
    "    <li>\n",
    "      <b>Configure your Ray Cluster:</b> Set up your multi-node environment (including head and worker nodes) and manage resource allocation (e.g., autoscaling, GPU/CPU assignments) without the Anyscale automation. See the Ray Cluster Setup documentation for details: <a href=\"https://docs.ray.io/en/latest/cluster/getting-started.html\">https://docs.ray.io/en/latest/cluster/getting-started.html</a>.\n",
    "    </li>\n",
    "    <li>\n",
    "      <b>Manage Dependencies:</b> Install and manage dependencies on each node since you won’t have Anyscale’s Docker-based dependency management. Refer to the Ray Installation Guide for instructions on installing and updating Ray in your environment: <a href=\"https://docs.ray.io/en/latest/ray-core/handling-dependencies.html\">https://docs.ray.io/en/latest/ray-core/handling-dependencies.html</a>.\n",
    "    </li>\n",
    "    <li>\n",
    "      <b>Set Up Storage:</b> Configure your own distributed or shared storage system (instead of relying on Anyscale’s integrated cluster storage). Check out the Ray Cluster Configuration guide for suggestions on setting up shared storage solutions: <a href=\"https://docs.ray.io/en/latest/train/user-guides/persistent-storage.html\">https://docs.ray.io/en/latest/train/user-guides/persistent-storage.html</a>.\n",
    "    </li>\n",
    "  </ul>\n",
    "\n",
    "</div>\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4314276c",
   "metadata": {},
   "source": [
    "## Document Loading with Unstructured IO\n",
    "\n",
    "Within the workspace, the folder named **`anyscale-jobs-docs`** contains **5** documents in various formats, including PDF, PPTX, HTML, TXT, and DOCX.\n",
    "\n",
    "We define a function that loads and partitions these documents using the Unstructured IO library for parsing. For more information, please visit: https://docs.unstructured.io/welcome.\n",
    "\n",
    "Documents are read from a specified directory and processed page by page. Each page’s text is grouped along with its corresponding metadata—such as source file details, file type, page number, and a unique document ID."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e64c73c0",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "\n",
    "import os\n",
    "import re\n",
    "from typing import List, Dict\n",
    "from pathlib import Path  \n",
    "import uuid\n",
    "\n",
    "\n",
    "# Importing our document partitioner from unstructured.io\n",
    "from unstructured.partition.auto import partition\n",
    "\n",
    "\n",
    "def process_documents_pages(source_dir: str) -> List[Dict]:\n",
    "    \"\"\"\n",
    "    Load documents from a given directory using Unstructured IO and group text by page number.\n",
    "\n",
    "    Args:\n",
    "        source_dir (str): Directory containing documents.\n",
    "\n",
    "    Returns:\n",
    "        List[Dict]: A list of dictionaries where each dictionary contains:\n",
    "                    - 'text': combined text from a page,\n",
    "                    - 'source': file path,\n",
    "                    - 'file_name': file name with extension,\n",
    "                    - 'file_type': file extension,\n",
    "                    - 'page_number': page number,\n",
    "                    - 'doc_id': a unique id for the entire file.\n",
    "    \"\"\"\n",
    "    pages = []\n",
    "    for file_path in Path(source_dir).rglob('*'):\n",
    "        if file_path.suffix.lower() in ('.pdf', '.docx', '.pptx', '.html', '.txt'):\n",
    "            print(\"processing file:\", file_path)\n",
    "            elements = partition(str(file_path))\n",
    "            doc_id = str(uuid.uuid4())  # Single doc_id per file\n",
    "\n",
    "            # Group text by page number\n",
    "            page_texts = {}\n",
    "            for el in elements:\n",
    "                # Get the page number; default to 1 if not provided\n",
    "                page_number = getattr(el.metadata, \"page_number\", None)\n",
    "                page_number = int(page_number) if page_number is not None else 1\n",
    "                \n",
    "                # Append element text to the corresponding page's list\n",
    "                page_texts.setdefault(page_number, []).append(str(el))\n",
    "            \n",
    "            # Create a document entry for each page\n",
    "            for page_number, texts in page_texts.items():\n",
    "                combined_text = \" \".join(texts).strip()\n",
    "                pages.append({\n",
    "                    \"text\": combined_text,\n",
    "                    \"source\": str(file_path),\n",
    "                    \"file_name\": file_path.name,\n",
    "                    \"file_type\": file_path.suffix,\n",
    "                    \"page_number\": page_number,\n",
    "                    \"doc_id\": doc_id\n",
    "                })\n",
    "    return pages\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0a2864b2-d41a-46f1-b33d-6da9231a55e6",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "processing file: anyscale-jobs-docs/Create_and_manage_jobs.pdf\n",
      "processing file: anyscale-jobs-docs/Job_queues.pptx\n",
      "processing file: anyscale-jobs-docs/Monitor_a_job.docx\n",
      "processing file: anyscale-jobs-docs/Jobs.txt\n",
      "processing file: anyscale-jobs-docs/Job_schedules.html\n",
      "File Name: Create_and_manage_jobs.pdf\n",
      "Page Number: 1\n",
      "Text: 2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs Create and manage jobs Submitting a job To submit your job to Anyscale, use the Python SDK or CLI and pass in any additional options or configur...\n",
      "--------------------------------------------------------------------------------\n",
      "File Name: Create_and_manage_jobs.pdf\n",
      "Page Number: 2\n",
      "Text: 2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs Defining a job With the CLI, you can define jobs in a YAML file and submit them by referencing the YAML: anyscale job submit --config-file confi...\n",
      "--------------------------------------------------------------------------------\n",
      "File Name: Create_and_manage_jobs.pdf\n",
      "Page Number: 3\n",
      "Text: 2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs anyscale job terminate --id 'prodjob_...' For more information on terminating jobs with the CLI, see the reference docs. Archiving a job Archivi...\n",
      "--------------------------------------------------------------------------------\n",
      "File Name: Create_and_manage_jobs.pdf\n",
      "Page Number: 4\n",
      "Text: 2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs 2. When submitting your job, include the -r or --requirements flag: CLI Python SDK anyscale job submit --config-file job.yaml -r ./requirements....\n",
      "--------------------------------------------------------------------------------\n",
      "File Name: Create_and_manage_jobs.pdf\n",
      "Page Number: 5\n",
      "Text: 2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs Using pre-built custom images For frequently used environments, you can build and reuse custom images: 1. Build the image: CLI Python SDK anysca...\n",
      "--------------------------------------------------------------------------------\n",
      "File Name: Job_queues.pptx\n",
      "Page Number: 1\n",
      "Text: 2/12/25, 9:48 AM\tJob queues | Anyscale Docs Job queues A job queue enables sophisticated scheduling and execution algorithms for Anyscale Jobs. This feature improves resource utilization and reduces p...\n",
      "--------------------------------------------------------------------------------\n",
      "File Name: Job_queues.pptx\n",
      "Page Number: 2\n",
      "Text: 2/12/25, 9:48 AM\tJob queues | Anyscale Docs entrypoint: python hello_world.py working_dir: \"https://github.com/anyscale/docs_examples/archive/refs/heads/main.zip\" name: JOB_NAME # Use compute_config a...\n",
      "--------------------------------------------------------------------------------\n",
      "File Name: Job_queues.pptx\n",
      "Page Number: 3\n",
      "Text: 2/12/25, 9:48 AM config to associate with the existing queue. Job queues | Anyscale Docs The submission will fail if you submit a job with the same job queue name but a different job_queue_spec , comp...\n",
      "--------------------------------------------------------------------------------\n",
      "File Name: Job_queues.pptx\n",
      "Page Number: 4\n",
      "Text: 2/12/25, 9:48 AM\tJob queues | Anyscale Docs To terminate all running jobs in the queue, use the Terminate running jobs button on the upper right corner of the Job queue page. Note that Anyscale doesn'...\n",
      "--------------------------------------------------------------------------------\n",
      "File Name: Job_queues.pptx\n",
      "Page Number: 5\n",
      "Text: 2/12/25, 9:48 AM Job queues | Anyscale Docs Open the Terminal tab and run ray job stop 'raysubmit_...' . https://docs.anyscale.com/platform/jobs/job-queues 3/5...\n",
      "--------------------------------------------------------------------------------\n",
      "File Name: Monitor_a_job.docx\n",
      "Page Number: 1\n",
      "Text: Monitor a job Anyscale jobs provides several tools to monitor your jobs: Job detail page Metrics Logs Alerts Ray Dashboard Exporting logs and metrics This document describes each use case and provides...\n",
      "--------------------------------------------------------------------------------\n",
      "File Name: Jobs.txt\n",
      "Page Number: 1\n",
      "Text: 2/12/25, 9:48 AM Jobs | Anyscale Docs Jobs Run discrete workloads in production such as batch inference, bulk embeddings generation, or model fine-tuning. Anyscale Jobs allow you to submit application...\n",
      "--------------------------------------------------------------------------------\n",
      "File Name: Job_schedules.html\n",
      "Page Number: 1\n",
      "Text: Create and manage jobs Submitting a job​ To submit your job to Anyscale, use the Python SDK or CLI and pass in any additional options or configurations for the job. By default, Anyscale uses your work...\n",
      "--------------------------------------------------------------------------------\n"
     ]
    }
   ],
   "source": [
    "source_directory = \"./anyscale-jobs-docs\"  # Replace with your actual folder path\n",
    "pages = process_documents_pages(source_directory)\n",
    "\n",
    "for page in pages:\n",
    "    print(f\"File Name: {page['file_name']}\")\n",
    "    print(f\"Page Number: {page['page_number']}\")\n",
    "    print(f\"Text: {page['text'][:200]}...\")  # Truncate text to 100 characters\n",
    "    print(\"-\" * 80)  # Separator for readability\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f01f71fc",
   "metadata": {},
   "source": [
    "##  Chunking Strategies\n",
    "\n",
    "This section demonstrates various strategies to divide a document into smaller, manageable text chunks using specialized text splitters. `ChunkingStrategy` class allow users to split text into smaller chunks using either a \"fixed\" or \"recursive\" method, with configurable parameters like encoding type, chunk size, and overlap. \n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2577d906",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "## doc: https://python.langchain.com/v0.1/docs/modules/data_connection/document_transformers/\n",
    "\n",
    "from typing import List\n",
    "# Importing various text splitters for different chunking strategies\n",
    "from langchain_text_splitters import (\n",
    "    RecursiveCharacterTextSplitter,\n",
    "    CharacterTextSplitter\n",
    ")\n",
    "\n",
    "class ChunkingStrategy:\n",
    "    def __init__(self, method: str = 'recursive', encoding_name: str = \"cl100k_base\", \n",
    "                 chunk_size: int = 300, chunk_overlap: int = 50):\n",
    "        \"\"\"\n",
    "        Initialize a chunking strategy.\n",
    "\n",
    "        Args:\n",
    "            method (str): The chunking method, e.g. 'fixed' or 'recursive'.\n",
    "            encoding_name (str): The name of the encoding to use.\n",
    "            chunk_size (int): The size of each chunk.\n",
    "            chunk_overlap (int): The overlap between chunks.\n",
    "        \"\"\"\n",
    "        self.method = method\n",
    "        self.encoding_name = encoding_name\n",
    "        self.chunk_size = chunk_size\n",
    "        self.chunk_overlap = chunk_overlap\n",
    "\n",
    "    def chunk_document(self, text: str) -> List[str]:\n",
    "        \"\"\"\n",
    "        Chunk a document's text using the selected strategy.\n",
    "\n",
    "        Args:\n",
    "            text (str): The document's text to chunk.\n",
    "\n",
    "        Returns:\n",
    "            List[str]: A list of text chunks.\n",
    "        \"\"\"\n",
    "        if self.method == 'fixed':\n",
    "            splitter = CharacterTextSplitter.from_tiktoken_encoder(\n",
    "                encoding_name=self.encoding_name, \n",
    "                chunk_size=self.chunk_size, \n",
    "                chunk_overlap=self.chunk_overlap\n",
    "            )\n",
    "            return splitter.split_text(text)\n",
    "        elif self.method == 'recursive':\n",
    "            splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(\n",
    "                encoding_name=self.encoding_name, \n",
    "                chunk_size=self.chunk_size, \n",
    "                chunk_overlap=self.chunk_overlap\n",
    "            )\n",
    "            return splitter.split_text(text)\n",
    "        else:\n",
    "            raise ValueError(\"Unknown chunking method: choose 'fixed' or 'recursive'.\")\n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3ca7bf02-10dc-4057-a2eb-09e801fc2c74",
   "metadata": {
    "tags": []
   },
   "source": [
    "## Test the Chunking Strategy Implementation\n",
    "\n",
    "Now let's put the chunking process in action for a single page:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1c65926e-43d6-42e7-9fd0-50fc810ca870",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Debug - Page Text: Monitor a job Anyscale jobs provides several tools to monitor your jobs: Job detail page Metrics Logs Alerts Ray Dashboard Exporting logs and metrics This document describes each use case and provides suggestions for when to use each tool. Job detail page The job detail page contains the status of the job, information about your job's configuration, details about each job attempt, events of the job, and links to various other tools. The job events log is at the bottom of the page. This log lists events of your job aAnsdkinAcIludes events about your job lifecycle and errors. Metrics Access metrics related to your job in the Metrics tab of the job detail page. Job metrics tracks hardware metrics and system-level metrics such as CPU or network utilization, memory, or disk usage, node count, number of Ray tasks, and number of active Ray actors. Metrics are also available in Grafana for a more advanced UI, which allows you to create custom dashboards for visualizing the metrics, including custom metrics. Access Grafana by clicking the View in Grafana button in the Metrics tab. Logs Logs are another source of information when debugging issues with your job. You can view the logs of your job by clicking the \"Logs\" tab in the job detail page. By default, you can will see the driver logs of your job. If the job is still running, you can also view the Ray logs of the job through the Ray Dashboard. Log viewer If you have enabled log ingestion, you have access to the Anyscale log viewer With the Anyscale log viewer, you have access to all Ray logs of your jobs and can search and filter by time, text, or labels such as task name, node ID, and more. By default, the logs are filtered to the time range of the job with no filters. You can change the time range by clicking the time range dropdown and select an end time and time window to look back. Anyscale stores up to 30 days of logs for your job. You're able to debug issues even after the job terminates. To filter the logs, use the search bar to search for specific keywords. Enter a request ID in the search bar to filter logs for a specific request. You can also use contain a specific pattern. Alerts to filter logs if your logs Anyscale jobs have a built-in alert for when a job succeeds or fails. The creator of the job receives an email notification when the job completes. To set up additional alerts based on your own criteria, see Custom dashboards and alerting guide. These alerts are useful for tracking the health of your jobs or job queues. Ray Dashboard The Ray Dashboard is scoped to a single Ray cluster. Each job attempt launches a new Ray cluster unless Job queues are used. To access this dashboard, click the \"Ray Dashboard\" tab in the job detail page. To learn more about how to use the Ray Dashboard, see the Ray documentation. Exporting logs and metrics If you want to push logs to Vector, a tool to ship logs to Amazon CloudWatch, Google Cloud Monitoring, Datadog, or other observability tools, see Exporting logs and metrics with Vector. More info To learn more details about the Ray Dashboard, see the Ray Dashboard documentation To learn more about Grafana and how to use it, see the official Grafana documentation To learn more about the metrics that Ray emits, see the System Metrics documentation\n",
      "File: Monitor_a_job.docx (Page Number: 1)\n",
      "Number of chunks: 3\n",
      "--------------------------------------------------------------------------------\n",
      "Chunk 1:\n",
      "Monitor a job Anyscale jobs provides several tools to monitor your jobs: Job detail page Metrics Logs Alerts Ray Dashboard Exporting logs and metrics This document describes each use case and provides suggestions for when to use each tool. Job detail page The job detail page contains the status of the job, information about your job's configuration, details about each job attempt, events of the job, and links to various other tools. The job events log is at the bottom of the page. This log lists events of your job aAnsdkinAcIludes events about your job lifecycle and errors. Metrics Access metrics related to your job in the Metrics tab of the job detail page. Job metrics tracks hardware metrics and system-level metrics such as CPU or network utilization, memory, or disk usage, node count, number of Ray tasks, and number of active Ray actors. Metrics are also available in Grafana for a more advanced UI, which allows you to create custom dashboards for visualizing the metrics, including custom metrics. Access Grafana by clicking the View in Grafana button in the Metrics tab. Logs Logs are another source of information when debugging issues with your job. You can view the logs of your job by clicking the \"Logs\" tab in the job detail page. By default, you can will see the driver logs of your job. If the job is still running, you can also view the Ray logs of the job through the Ray Dashboard. Log viewer If you have enabled log ingestion, you have access\n",
      "--------------------------------------------------------------------------------\n",
      "Chunk 2:\n",
      "page. By default, you can will see the driver logs of your job. If the job is still running, you can also view the Ray logs of the job through the Ray Dashboard. Log viewer If you have enabled log ingestion, you have access to the Anyscale log viewer With the Anyscale log viewer, you have access to all Ray logs of your jobs and can search and filter by time, text, or labels such as task name, node ID, and more. By default, the logs are filtered to the time range of the job with no filters. You can change the time range by clicking the time range dropdown and select an end time and time window to look back. Anyscale stores up to 30 days of logs for your job. You're able to debug issues even after the job terminates. To filter the logs, use the search bar to search for specific keywords. Enter a request ID in the search bar to filter logs for a specific request. You can also use contain a specific pattern. Alerts to filter logs if your logs Anyscale jobs have a built-in alert for when a job succeeds or fails. The creator of the job receives an email notification when the job completes. To set up additional alerts based on your own criteria, see Custom dashboards and alerting guide. These alerts are useful for tracking the health of your jobs or job queues. Ray Dashboard The Ray Dashboard is scoped to a single Ray cluster. Each job attempt launches a new Ray cluster unless Job queues\n",
      "--------------------------------------------------------------------------------\n",
      "Chunk 3:\n",
      "criteria, see Custom dashboards and alerting guide. These alerts are useful for tracking the health of your jobs or job queues. Ray Dashboard The Ray Dashboard is scoped to a single Ray cluster. Each job attempt launches a new Ray cluster unless Job queues are used. To access this dashboard, click the \"Ray Dashboard\" tab in the job detail page. To learn more about how to use the Ray Dashboard, see the Ray documentation. Exporting logs and metrics If you want to push logs to Vector, a tool to ship logs to Amazon CloudWatch, Google Cloud Monitoring, Datadog, or other observability tools, see Exporting logs and metrics with Vector. More info To learn more details about the Ray Dashboard, see the Ray Dashboard documentation To learn more about Grafana and how to use it, see the official Grafana documentation To learn more about the metrics that Ray emits, see the System Metrics documentation\n",
      "--------------------------------------------------------------------------------\n"
     ]
    }
   ],
   "source": [
    "## Test the Chunking Strategy Implementation\n",
    "\n",
    "# Create a ChunkingStrategy instance with the desired settings.\n",
    "chunker = ChunkingStrategy(chunk_size=300, chunk_overlap=50)\n",
    "\n",
    "# Retrieve page information from the pages list (using the 11th page as an example).\n",
    "page = pages[10]\n",
    "file_name = page[\"file_name\"]\n",
    "file_type = page[\"file_type\"]\n",
    "page_number = page[\"page_number\"]\n",
    "text_content = page[\"text\"]\n",
    "\n",
    "# Debug: Print the text content of the selected page.\n",
    "print(\"Debug - Page Text:\", text_content)\n",
    "\n",
    "# Chunk the document text using the chunk_document method.\n",
    "chunks = chunker.chunk_document(text_content)\n",
    "\n",
    "# Display file and page details along with the number of chunks generated.\n",
    "print(f\"File: {file_name} (Page Number: {page_number})\")\n",
    "print(f\"Number of chunks: {len(chunks)}\")\n",
    "print(\"-\" * 80)\n",
    "\n",
    "# Iterate through each chunk and print its contents.\n",
    "for idx, chunk in enumerate(chunks):\n",
    "    print(f\"Chunk {idx + 1}:\")\n",
    "    print(chunk)\n",
    "    print(\"-\" * 80)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "24348547",
   "metadata": {},
   "source": [
    "Now let's Chunk all documents and collect chunks along with metadata"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0e9c28ba-147c-4239-97bb-a6665b92d077",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Created 22 text chunks.\n"
     ]
    }
   ],
   "source": [
    "import uuid\n",
    "\n",
    "all_chunks = []\n",
    "\n",
    "for page in pages:\n",
    "    chunks = chunker.chunk_document(page[\"text\"])\n",
    "    \n",
    "    for chunk in chunks:\n",
    "        all_chunks.append({\n",
    "            \"id\": str(uuid.uuid4()),  # Generate a unique ID for each chunk\n",
    "            \"text\": chunk,\n",
    "            \"metadata\": {\n",
    "                \"source\": page[\"source\"],\n",
    "                \"doc_id\": page[\"doc_id\"],\n",
    "                \"file_name\": page[\"file_name\"],\n",
    "                \"file_type\": page[\"file_type\"],\n",
    "                \"page_number\": page[\"page_number\"],\n",
    "                \"chunk_method\": chunker.method\n",
    "            }\n",
    "        })\n",
    "\n",
    "print(f\"Created {len(all_chunks)} text chunks.\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "75dce24f-4378-4d70-b375-026cbc466228",
   "metadata": {
    "tags": []
   },
   "source": [
    "##  Generate Embeddings \n",
    "\n",
    "Before setting up Chroma DB for vector storage, generate embeddings for your text chunks. \n",
    "We are using `intfloat/multilingual-e5-large-instruct` model. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "354ac2b0-91dd-4a4b-8e89-16ce11ee7eab",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "94b2870431cc4e2c9dd74f34b34d6d51",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Batches:   0%|          | 0/1 [00:00<?, ?it/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Embeddings generated and attached to each chunk.\n"
     ]
    }
   ],
   "source": [
    "\n",
    "from sentence_transformers import SentenceTransformer\n",
    "\n",
    "# Initialize a SentenceTransformer model (choose one appropriate for your use case)\n",
    "embed_model = SentenceTransformer(\"intfloat/multilingual-e5-large-instruct\")\n",
    "\n",
    "# Gather all chunk texts for embedding generation\n",
    "chunk_texts = [chunk[\"text\"] for chunk in all_chunks]\n",
    "\n",
    "# Compute embeddings (using batch encoding for efficiency)\n",
    "embeddings = embed_model.encode(chunk_texts, convert_to_numpy=True, show_progress_bar=True)\n",
    "\n",
    "# Attach each embedding (converted to a list) to the corresponding chunk\n",
    "for i, chunk in enumerate(all_chunks):\n",
    "    chunk[\"embedding\"] = embeddings[i].tolist()\n",
    "\n",
    "print(\"Embeddings generated and attached to each chunk.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "da2291d3",
   "metadata": {},
   "source": [
    "## Set up Chroma DB\n",
    "\n",
    "We are using ChromaDB to store and manage our computed embeddings. For demo purpose, a temporary directory is initialized for persistent storage, and a ChromaDB client is created along with a collection named \"sample_embeddings\" to hold the embeddings, associated metadata, and document texts."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "75e57855-ab85-4adf-9f23-6c5683c49b57",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Embeddings stored in ChromaDB.\n"
     ]
    }
   ],
   "source": [
    "import tempfile\n",
    "import chromadb\n",
    "from sentence_transformers import SentenceTransformer\n",
    "\n",
    "# Create a temporary directory\n",
    "temp_dir = tempfile.TemporaryDirectory()\n",
    "temp_chroma_db_path = temp_dir.name  # This is the path for ChromaDB storage\n",
    "\n",
    "# Initialize ChromaDB client\n",
    "chroma_client = chromadb.PersistentClient(path=temp_chroma_db_path)  # Change path as needed\n",
    "collection = chroma_client.get_or_create_collection(name=\"sample_embeddings\")\n",
    "# Store embeddings in ChromaDB\n",
    "collection.add(\n",
    "    ids=[chunk[\"id\"] for chunk in all_chunks],\n",
    "    embeddings=embeddings.tolist(),\n",
    "    metadatas=[chunk[\"metadata\"] for chunk in all_chunks],\n",
    "    documents=chunk_texts\n",
    ")\n",
    "\n",
    "print(\"Embeddings stored in ChromaDB.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5597c2d5",
   "metadata": {},
   "source": [
    "## Embedding Search with Chroma\n",
    "\n",
    "we perform a search for relevant document embeddings stored in ChromaDB. By generating an embedding for a given text query, we can identify and retrieve the most similar documents from the collection, along with their metadata and similarity scores."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "aa762f33",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Query Results:\n",
      "{'data': None,\n",
      " 'distances': [[0.18700521448677737, 0.18982562470474848, 0.1924720483496574]],\n",
      " 'documents': [['Create and manage jobs Submitting a job\\u200b To submit your '\n",
      "                'job to Anyscale, use the Python SDK or CLI and pass in any '\n",
      "                'additional options or configurations for the job. By default, '\n",
      "                'Anyscale uses your workspace or cloud to provision a cluster '\n",
      "                'to run your job. You can define a custom cluster through a '\n",
      "                'compute config or specify an existing cluster. Once '\n",
      "                'submitted, Anyscale runs the job as specified in the '\n",
      "                'entrypoint command, which is typically a Ray Job. If the run '\n",
      "                \"doesn't succeed, the job restarts using the same entrypoint \"\n",
      "                'up to the number of max_retries. CLI Python SDK anyscale job '\n",
      "                'submit --name=my-job \\\\\\n'\n",
      "                '  --working-dir=. --max-retries=5 \\\\\\n'\n",
      "                '  --image-uri=\"anyscale/image/IMAGE_NAME:VERSION\" \\\\\\n'\n",
      "                '  --compute-config=COMPUTE_CONFIG_NAME \\\\\\n'\n",
      "                '  -- python main.py With the CLI, you can either specify an '\n",
      "                'existing compute config with '\n",
      "                '--compute-config=COMPUTE_CONFIG_NAME or define a new one in a '\n",
      "                'job YAML. For more information on submitting jobs with the '\n",
      "                'CLI, see the reference docs. import anyscale\\n'\n",
      "                'from anyscale.job.models import JobConfig\\n'\n",
      "                '\\n'\n",
      "                'config = JobConfig(\\n'\n",
      "                '  name=\"my-job\",\\n'\n",
      "                '  entrypoint=\"python main.py\",\\n'\n",
      "                '  working_dir=\".\",\\n'\n",
      "                '  max_retries=5,\\n'\n",
      "                '  image_uri=\"anyscale/image/IMAGE_NAME:VERSION\",\\n'\n",
      "                '  compute_config=\"COMPUTE_CONFIG_NAME\"\\n'\n",
      "                ')',\n",
      "                '2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs '\n",
      "                'Create and manage jobs Submitting a job To submit your job to '\n",
      "                'Anyscale, use the Python SDK or CLI and pass in any '\n",
      "                'additional options or configurations for the job. By default, '\n",
      "                'Anyscale uses your workspace or cloud to provision a cluster '\n",
      "                'to run your job. You can define a custom cluster through a '\n",
      "                'compute config or specify an existing cluster. Once '\n",
      "                'submitted, Anyscale runs the job as specified in the '\n",
      "                'entrypoint command, which is typically a Ray Job. If the run '\n",
      "                \"doesn't succeed, the job restarts using the same entrypoint \"\n",
      "                'up to the number of max_retries . CLI Python SDK anyscale job '\n",
      "                'submit --name=my-job \\\\ --working-dir=. --max-retries=5 \\\\ '\n",
      "                '--image-uri=\"anyscale/image/IMAGE_NAME:VERSION\" \\\\ '\n",
      "                '--compute-config=COMPUTE_CONFIG_NAME \\\\ -- python main.py '\n",
      "                'With the CLI, you can either specify an existing compute '\n",
      "                'config with --compute- config=COMPUTE_CONFIG_NAME or define a '\n",
      "                'new one in a job YAML. For more information on submitting '\n",
      "                'jobs with the CLI, see the reference docs. TIP For '\n",
      "                'large-scale, compute-intensive jobs, avoid scheduling Ray '\n",
      "                'tasks onto the head node because it manages cluster-level '\n",
      "                'orchestration. To do that, set the CPU resource on the head '\n",
      "                'node to 0 in your compute config. Ask AI '\n",
      "                'https://docs.anyscale.com/platform/jobs/manage-jobs 1/5',\n",
      "                '2/12/25, 9:48 AM Jobs | Anyscale Docs Jobs Run discrete '\n",
      "                'workloads in production such as batch inference, bulk '\n",
      "                'embeddings generation, or model fine-tuning. Anyscale Jobs '\n",
      "                'allow you to submit applications developed on workspaces to a '\n",
      "                'standalone Ray cluster for execution. Built for production '\n",
      "                'and designed to fit into your CI/CD pipeline, jobs ensure '\n",
      "                'scalable and reliable performance. How does it work? # When '\n",
      "                'you’re ready to promote an app to production, submit a job '\n",
      "                'from the workspace using anyscale job submit . Anyscale Jobs '\n",
      "                'have the following features: Scalability: Rapid scaling to '\n",
      "                'thousands of cloud instances, adjusting computing resources '\n",
      "                'to match application demand. Fault tolerance: Retries for '\n",
      "                'failures and automatic rescheduling to an alternative cluster '\n",
      "                'for unexpected failures like running out of memory. '\n",
      "                'Monitoring and observability: Persistent dashboards that '\n",
      "                'allow you to observe tasks in real time and email alerts upon '\n",
      "                'successf ul job completion. Get started 1. Sign in or sign up '\n",
      "                'for an account. 2. Select the Intro to Jobs example. 3. '\n",
      "                'Select Launch. This example runs in a Workspace. See '\n",
      "                'Workspaces for background information. 4. Follow the notebook '\n",
      "                'or view it in the docs. 5. Terminate the Workspace when '\n",
      "                \"you're done. Ask AI https://docs.anyscale.com/platform/jobs/ \"\n",
      "                '1/2 2/12/25, 9:48 AM Jobs | Anyscale Docs '\n",
      "                'https://docs.anyscale.com/platform/jobs/']],\n",
      " 'embeddings': None,\n",
      " 'ids': [['d972e888-9aa4-4e8c-9543-ece3b21ab2f3',\n",
      "          '19a58587-1fe6-43c8-8275-be2755a0f8a1',\n",
      "          'c2bfae32-ea87-47dd-97b3-9a55ad2585d6']],\n",
      " 'included': [<IncludeEnum.distances: 'distances'>,\n",
      "              <IncludeEnum.documents: 'documents'>,\n",
      "              <IncludeEnum.metadatas: 'metadatas'>],\n",
      " 'metadatas': [[{'chunk_method': 'recursive',\n",
      "                 'doc_id': 'fc073a75-025a-4b5e-8192-7a3a73f92739',\n",
      "                 'file_name': 'Job_schedules.html',\n",
      "                 'file_type': '.html',\n",
      "                 'page_number': 1,\n",
      "                 'source': 'anyscale-jobs-docs/Job_schedules.html'},\n",
      "                {'chunk_method': 'recursive',\n",
      "                 'doc_id': '9ee4e77b-8ff9-4ea9-afc6-e67642798174',\n",
      "                 'file_name': 'Create_and_manage_jobs.pdf',\n",
      "                 'file_type': '.pdf',\n",
      "                 'page_number': 1,\n",
      "                 'source': 'anyscale-jobs-docs/Create_and_manage_jobs.pdf'},\n",
      "                {'chunk_method': 'recursive',\n",
      "                 'doc_id': 'c87a8846-5a60-4e95-9805-fe1dce18507c',\n",
      "                 'file_name': 'Jobs.txt',\n",
      "                 'file_type': '.txt',\n",
      "                 'page_number': 1,\n",
      "                 'source': 'anyscale-jobs-docs/Jobs.txt'}]],\n",
      " 'uris': None}\n"
     ]
    }
   ],
   "source": [
    "from pprint import pprint\n",
    "# Define your text query\n",
    "query_text = \"how to submit anyscale jobs\"\n",
    "\n",
    "# Generate the embedding for the query text\n",
    "query_embedding = embed_model.encode(query_text).tolist()\n",
    "\n",
    "# Query the collection for the top 3 most similar documents.\n",
    "# The 'include' parameter lets you retrieve documents, metadatas, and distances.\n",
    "results = collection.query(\n",
    "    query_embeddings=[query_embedding],\n",
    "    n_results=3,\n",
    "    include=[\"documents\", \"metadatas\", \"distances\"]\n",
    ")\n",
    "\n",
    "# Print the retrieval results\n",
    "print(\"Query Results:\")\n",
    "pprint(results)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2f2a7d6e",
   "metadata": {},
   "source": [
    "## Reformat Chroma Search Results\n",
    "\n",
    "The embedding search results returned from Chroma are not intuitive, so let's reformat them:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f04aadb2",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "refromat Results:\n",
      "[{'chunk_id': None,\n",
      "  'chunk_index': 1,\n",
      "  'distance': 0.18700521448677737,\n",
      "  'doc_id': 'fc073a75-025a-4b5e-8192-7a3a73f92739',\n",
      "  'page_number': 1,\n",
      "  'score': 0.8129947855132227,\n",
      "  'source': 'anyscale-jobs-docs/Job_schedules.html',\n",
      "  'text': 'Create and manage jobs Submitting a job\\u200b To submit your job to '\n",
      "          'Anyscale, use the Python SDK or CLI and pass in any additional '\n",
      "          'options or configurations for the job. By default, Anyscale uses '\n",
      "          'your workspace or cloud to provision a cluster to run your job. You '\n",
      "          'can define a custom cluster through a compute config or specify an '\n",
      "          'existing cluster. Once submitted, Anyscale runs the job as '\n",
      "          'specified in the entrypoint command, which is typically a Ray Job. '\n",
      "          \"If the run doesn't succeed, the job restarts using the same \"\n",
      "          'entrypoint up to the number of max_retries. CLI Python SDK anyscale '\n",
      "          'job submit --name=my-job \\\\\\n'\n",
      "          '  --working-dir=. --max-retries=5 \\\\\\n'\n",
      "          '  --image-uri=\"anyscale/image/IMAGE_NAME:VERSION\" \\\\\\n'\n",
      "          '  --compute-config=COMPUTE_CONFIG_NAME \\\\\\n'\n",
      "          '  -- python main.py With the CLI, you can either specify an '\n",
      "          'existing compute config with --compute-config=COMPUTE_CONFIG_NAME '\n",
      "          'or define a new one in a job YAML. For more information on '\n",
      "          'submitting jobs with the CLI, see the reference docs. import '\n",
      "          'anyscale\\n'\n",
      "          'from anyscale.job.models import JobConfig\\n'\n",
      "          '\\n'\n",
      "          'config = JobConfig(\\n'\n",
      "          '  name=\"my-job\",\\n'\n",
      "          '  entrypoint=\"python main.py\",\\n'\n",
      "          '  working_dir=\".\",\\n'\n",
      "          '  max_retries=5,\\n'\n",
      "          '  image_uri=\"anyscale/image/IMAGE_NAME:VERSION\",\\n'\n",
      "          '  compute_config=\"COMPUTE_CONFIG_NAME\"\\n'\n",
      "          ')'},\n",
      " {'chunk_id': None,\n",
      "  'chunk_index': 2,\n",
      "  'distance': 0.18982562470474848,\n",
      "  'doc_id': '9ee4e77b-8ff9-4ea9-afc6-e67642798174',\n",
      "  'page_number': 1,\n",
      "  'score': 0.8101743752952515,\n",
      "  'source': 'anyscale-jobs-docs/Create_and_manage_jobs.pdf',\n",
      "  'text': '2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs Create and '\n",
      "          'manage jobs Submitting a job To submit your job to Anyscale, use '\n",
      "          'the Python SDK or CLI and pass in any additional options or '\n",
      "          'configurations for the job. By default, Anyscale uses your '\n",
      "          'workspace or cloud to provision a cluster to run your job. You can '\n",
      "          'define a custom cluster through a compute config or specify an '\n",
      "          'existing cluster. Once submitted, Anyscale runs the job as '\n",
      "          'specified in the entrypoint command, which is typically a Ray Job. '\n",
      "          \"If the run doesn't succeed, the job restarts using the same \"\n",
      "          'entrypoint up to the number of max_retries . CLI Python SDK '\n",
      "          'anyscale job submit --name=my-job \\\\ --working-dir=. '\n",
      "          '--max-retries=5 \\\\ --image-uri=\"anyscale/image/IMAGE_NAME:VERSION\" '\n",
      "          '\\\\ --compute-config=COMPUTE_CONFIG_NAME \\\\ -- python main.py With '\n",
      "          'the CLI, you can either specify an existing compute config with '\n",
      "          '--compute- config=COMPUTE_CONFIG_NAME or define a new one in a job '\n",
      "          'YAML. For more information on submitting jobs with the CLI, see the '\n",
      "          'reference docs. TIP For large-scale, compute-intensive jobs, avoid '\n",
      "          'scheduling Ray tasks onto the head node because it manages '\n",
      "          'cluster-level orchestration. To do that, set the CPU resource on '\n",
      "          'the head node to 0 in your compute config. Ask AI '\n",
      "          'https://docs.anyscale.com/platform/jobs/manage-jobs 1/5'},\n",
      " {'chunk_id': None,\n",
      "  'chunk_index': 3,\n",
      "  'distance': 0.1924720483496574,\n",
      "  'doc_id': 'c87a8846-5a60-4e95-9805-fe1dce18507c',\n",
      "  'page_number': 1,\n",
      "  'score': 0.8075279516503426,\n",
      "  'source': 'anyscale-jobs-docs/Jobs.txt',\n",
      "  'text': '2/12/25, 9:48 AM Jobs | Anyscale Docs Jobs Run discrete workloads '\n",
      "          'in production such as batch inference, bulk embeddings generation, '\n",
      "          'or model fine-tuning. Anyscale Jobs allow you to submit '\n",
      "          'applications developed on workspaces to a standalone Ray cluster '\n",
      "          'for execution. Built for production and designed to fit into your '\n",
      "          'CI/CD pipeline, jobs ensure scalable and reliable performance. How '\n",
      "          'does it work? # When you’re ready to promote an app to production, '\n",
      "          'submit a job from the workspace using anyscale job submit . '\n",
      "          'Anyscale Jobs have the following features: Scalability: Rapid '\n",
      "          'scaling to thousands of cloud instances, adjusting computing '\n",
      "          'resources to match application demand. Fault tolerance: Retries for '\n",
      "          'failures and automatic rescheduling to an alternative cluster for '\n",
      "          'unexpected failures like running out of memory. Monitoring and '\n",
      "          'observability: Persistent dashboards that allow you to observe '\n",
      "          'tasks in real time and email alerts upon successf ul job '\n",
      "          'completion. Get started 1. Sign in or sign up for an account. 2. '\n",
      "          'Select the Intro to Jobs example. 3. Select Launch. This example '\n",
      "          'runs in a Workspace. See Workspaces for background information. 4. '\n",
      "          'Follow the notebook or view it in the docs. 5. Terminate the '\n",
      "          \"Workspace when you're done. Ask AI \"\n",
      "          'https://docs.anyscale.com/platform/jobs/ 1/2 2/12/25, 9:48 AM Jobs '\n",
      "          '| Anyscale Docs https://docs.anyscale.com/platform/jobs/'}]\n"
     ]
    }
   ],
   "source": [
    "from pprint import pprint\n",
    "def reformat(chroma_results: dict) -> list:\n",
    "    \"\"\"\n",
    "    Reformat chroma db results to a list of search items containing:\n",
    "    - chunk_id\n",
    "    - chunk_index\n",
    "    - doc_id\n",
    "    - page_number\n",
    "    - source\n",
    "    - text (from documents)\n",
    "    - distance\n",
    "    - score\n",
    "\n",
    "    Parameters:\n",
    "        chroma_results (dict): The raw results from the Chroma DB query.\n",
    "\n",
    "    Returns:\n",
    "        list: A list of dictionaries with the desired keys.\n",
    "    \"\"\"\n",
    "    reformatted = []\n",
    "    \n",
    "    # Get the lists from the results. They are expected to be lists of lists.\n",
    "    metadatas = chroma_results.get(\"metadatas\", [])\n",
    "    documents = chroma_results.get(\"documents\", [])\n",
    "    distances = chroma_results.get(\"distances\", [])\n",
    "    \n",
    "    # Loop over each group (each inner list represents one set of matches)\n",
    "    chunk_index = 1\n",
    "    for meta_group, doc_group, distance_group in zip(metadatas, documents, distances):\n",
    "        # Iterate over each item in the inner lists\n",
    "        for meta, text, distance in zip(meta_group, doc_group, distance_group):\n",
    "            item = {\n",
    "                \"chunk_index\": chunk_index,\n",
    "                \"chunk_id\": meta.get(\"chunk_id\"),\n",
    "                \"doc_id\": meta.get(\"doc_id\"),\n",
    "                \"page_number\": meta.get(\"page_number\"),\n",
    "                \"source\": meta.get(\"source\"),\n",
    "                \"text\": text,\n",
    "                \"distance\": distance,\n",
    "                \"score\": 1 - distance\n",
    "            }\n",
    "            reformatted.append(item)\n",
    "            chunk_index += 1\n",
    "    \n",
    "    return reformatted\n",
    "\n",
    "\n",
    "print(\"refromat Results:\")\n",
    "pprint(reformat(results))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "12560f6c",
   "metadata": {},
   "source": [
    "## Why it's not scalable"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e3926062",
   "metadata": {},
   "source": [
    "The current regular RAG document ingestion approach processes documents sequentially, without utilizing parallelization. This lack of parallel processing creates a bottleneck when managing large volumes of data. Without distributed processing, scaling to thousands of documents — each potentially containing dozens or even hundreds of pages — can quickly overwhelm the resources of a single machine.\n",
    "\n",
    "Moreover, embedding all text chunks in a single operation can result in substantial memory constraints and cause performance degradation, further exacerbating the issue.\n",
    "\n",
    "In the next notebook, we will demonstrate how to leverage **`RayData`** to build a scalable document ingestion pipeline. By utilizing Ray’s distributed computing capabilities, we can significantly improve processing efficiency, reduce bottlenecks, and enable seamless scaling for large-scale document ingestion."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.11"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
